package com.tca.common.learning.webflux.reactor.start;

import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Scheduler为调度器, 有多个实现
 * Schedulers类似于Executors, 可创建多种Scheduler实现
 *
 * Schedulers.single()和Schedulers.newSingle()对应Executors.newSingleThreadExecutor()
 * Schedulers.elastic()和Schedulers.newElastic()对应Executors.newCachedThreadPool()
 * Schedulers.parallel()和Schedulers.newParallel()对应Executors.newFixedThreadPool()
 *
 */
public class SchedulersTest {

    public static void main(String[] args) throws InterruptedException {

        // schedule
//        sync2Async();

        // 回压
        backpressure();
    }

    /**
     * 同步转为异步
     */
    private static void sync2Async() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        // 1.使用fromCallable声明一个基于Callable的Mono
        Mono.fromCallable(() -> {
                    TimeUnit.SECONDS.sleep(2L);
                    return "hello async";
                })
                // 2.使用subscribeOn将任务调度到Schedulers内置的弹性线程池执行, 弹性线程池会为Callable的执行任务分配一个单独的线程
                .subscribeOn(Schedulers.elastic())
                .subscribe(System.out::println, null, countDownLatch::countDown);
        countDownLatch.await(10, TimeUnit.SECONDS);
    }

    /**
     * 回压
     */
    private static void backpressure() {
        Flux.range(1, 6)    // 1
                .doOnRequest(n -> System.out.println("Request " + n + " values..."))    // 2
                .subscribe(new BaseSubscriber<Integer>() {  // 3
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) { // 4
                        System.out.println("Subscribed and make a request...");
                        request(1); // 5
                    }

                    @Override
                    protected void hookOnNext(Integer value) {  // 6
                        try {
                            TimeUnit.SECONDS.sleep(1);  // 7
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("Get value [" + value + "]");    // 8
                        request(1); // 9
                    }
                });
    }

}

